package hbasecli.binary.service;

import com.google.protobuf.ByteString;
import hbasecli.binary.model.*;
import hbasecli.connection.HBaseConnections;
import hbasecli.exception.ParamException;
import hbasecli.model.bytes.CellInfo;
import hbasecli.model.bytes.RowInfo;
import hbasecli.service.HBaseService;
import org.apache.hadoop.hbase.client.Connection;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.*;

import static org.springframework.util.CollectionUtils.isEmpty;
import static org.springframework.util.StringUtils.hasText;

@Service
public class HBaseServiceForBinary extends HBaseService {

    @Resource
    private HBaseConnections connections;

    public RowObjectsResponse query(QueryParam param) {
        String connectionId = param.getConnectionId();
        if (!hasText(connectionId)) {
            throw new ParamException("未知的连接");
        }
        String tableName = param.getTableName();
        if (!hasText(tableName)) {
            throw new ParamException("表名不可以为空值");
        }
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        byte[] rowKeyPattern = param.getRowKeyPattern().toByteArray();
        List<RowInfo> rowInfoList;
        if (rowKeyPattern == null || rowKeyPattern.length == 0) {
            rowInfoList = scan(connection, tableName, null, null);
        } else {
            rowInfoList = fuzzyScan(connection, tableName, rowKeyPattern, '_', null);
        }
        RowObjectsResponse.Builder builder = RowObjectsResponse.newBuilder();
        builder.setCode(0).setMessage("");
        RowObject.Builder rowObjectBuilder = RowObject.newBuilder();
        CellObject.Builder cellObjectBuilder = CellObject.newBuilder();
        for (RowInfo rowInfo : rowInfoList) {
            rowObjectBuilder.setRowKey(ByteString.copyFrom(rowInfo.getRowKey()));
            Collection<CellInfo> cells = rowInfo.getCells();
            for (CellInfo cell : cells) {
                cellObjectBuilder.setFamily(ByteString.copyFrom(cell.getFamily()));
                cellObjectBuilder.setQualifier(ByteString.copyFrom(cell.getQualifier()));
                cellObjectBuilder.setValue(ByteString.copyFrom(cell.getValue()));
                rowObjectBuilder.addCells(cellObjectBuilder.build());
                cellObjectBuilder.clear();
            }
            builder.addData(rowObjectBuilder.build());
            rowObjectBuilder.clear();
        }
        return builder.build();
    }

    public BooleanResponse createRow(CreateRowParam param) {
        String connectionId = param.getConnectionId();
        if (!hasText(connectionId)) {
            throw new ParamException("未知的连接");
        }
        String tableName = param.getTableName();
        if (!hasText(tableName)) {
            throw new ParamException("表名不可以为空值");
        }
        byte[] rowKey = param.getRowKey().toByteArray();
        if (rowKey == null || rowKey.length == 0) {
            throw new ParamException("行键不可以为空值");
        }
        List<CellParam> cells = param.getCellsList();
        if (isEmpty(cells)) {
            throw new ParamException("至少要有一条有效的数据");
        }
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        List<byte[][]> familyQualifierValues = new ArrayList<>();
        for (CellParam cell : cells) {
            byte[] family = cell.getFamily().toByteArray();
            byte[] qualifier = cell.getQualifier().toByteArray();
            byte[] value = cell.getValue().toByteArray();
            familyQualifierValues.add(new byte[][]{family, qualifier, value});
        }
        return BooleanResponse.newBuilder().setCode(0).setMessage("").setData(
                put(connection, tableName, rowKey, familyQualifierValues)
        ).build();
    }

    public BooleanResponse deleteRow(DeleteRowParam param) {
        String connectionId = param.getConnectionId();
        if (!hasText(connectionId)) {
            throw new ParamException("未知的连接");
        }
        String tableName = param.getTableName();
        if (!hasText(tableName)) {
            throw new ParamException("表名不可以为空值");
        }
        List<ByteString> rowKeysList = param.getRowKeysList();
        if (isEmpty(rowKeysList)) {
            throw new ParamException("至少要有一个行键");
        }
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        List<byte[]> list = new ArrayList<>(rowKeysList.size());
        for (ByteString bytes : rowKeysList) {
            list.add(bytes.toByteArray());
        }
        return BooleanResponse.newBuilder().setCode(0).setMessage("").setData(
                deleteRow(connection, tableName, list)
        ).build();
    }

}